[[streams-kafka-streams]]
= Apache Kafka Streams Support

Starting with version 1.1.4, Spring for Apache Kafka provides first-class support for https://kafka.apache.org/documentation/streams[Kafka Streams].
To use it from a Spring application, the `kafka-streams` jar must be present on classpath.
It is an optional dependency of the Spring for Apache Kafka project and is not downloaded transitively.

[[basics]]
== Basics

The reference Apache Kafka Streams documentation suggests the following way of using the API:

[source, java]
----
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();
----

So, we have two main components:

* `StreamsBuilder`: With an API to build `KStream` (or `KTable`) instances.
* `KafkaStreams`: To manage the lifecycle of those instances.

NOTE: All `KStream` instances exposed to a `KafkaStreams` instance by a single `StreamsBuilder` are started and stopped at the same time, even if they have different logic.
In other words, all streams defined by a `StreamsBuilder` are tied with a single lifecycle control.
Once a `KafkaStreams` instance has been closed by `streams.close()`, it cannot be restarted.
Instead, a new `KafkaStreams` instance to restart stream processing must be created.

[[streams-spring]]
== Spring Management

To simplify using Kafka Streams from the Spring application context perspective and use the lifecycle management through a container, Spring for Apache Kafka introduces `StreamsBuilderFactoryBean`.
This is an `AbstractFactoryBean` implementation to expose a `StreamsBuilder` singleton instance as a bean.
The following example creates such a bean:

[source, java]
----
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
----

IMPORTANT: Starting with version 2.2, the stream configuration is now provided as a `KafkaStreamsConfiguration` object rather than a `StreamsConfig`.

The `StreamsBuilderFactoryBean` also implements `SmartLifecycle` to manage the lifecycle of an internal `KafkaStreams` instance.
Similar to the Kafka Streams API, you must define the `KStream` instances before you start the `KafkaStreams`.
That also applies for the Spring API for Kafka Streams.
Therefore, when you use default `autoStartup = true` on the `StreamsBuilderFactoryBean`, you must declare `KStream` instances on the `StreamsBuilder` before the application context is refreshed.
For example, `KStream` can be a regular bean definition, while the Kafka Streams API is used without any impacts.
The following example shows how to do so:

[source, java]
----
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}
----

If you would like to control the lifecycle manually (for example, stopping and starting by some condition), you can reference the `StreamsBuilderFactoryBean` bean directly by using the factory bean (`&`) https://docs.spring.io/spring-framework/reference/core/beans/factory-extension.html#beans-factory-extension-factorybean[prefix].
Since `StreamsBuilderFactoryBean` uses its internal `KafkaStreams` instance, it is safe to stop and restart it again.
A new `KafkaStreams` is created on each `start()`.
You might also consider using different `StreamsBuilderFactoryBean` instances, if you would like to control the lifecycles for `KStream` instances separately.

You also can specify `KafkaStreams.StateListener`, `Thread.UncaughtExceptionHandler`, and `StateRestoreListener` options on the `StreamsBuilderFactoryBean`, which are delegated to the internal `KafkaStreams` instance.
Also, apart from setting those options indirectly on `StreamsBuilderFactoryBean`, starting with _version 2.1.5_, you can use a `KafkaStreamsCustomizer` callback interface to configure an inner `KafkaStreams` instance.
Note that `KafkaStreamsCustomizer` overrides the options provided by `StreamsBuilderFactoryBean`.
If you need to perform some `KafkaStreams` operations directly, you can access that internal `KafkaStreams` instance by using `StreamsBuilderFactoryBean.getKafkaStreams()`.
You can autowire `StreamsBuilderFactoryBean` bean by type, but you should be sure to use the full type in the bean definition, as the following example shows:

[source,java]
----
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
----

Alternatively, you can add `@Qualifier` for injection by name if you use interface bean definition.
The following example shows how to do so:

[source,java]
----
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
----

Starting with version 2.4.1, the factory bean has a new property `infrastructureCustomizer` with type `KafkaStreamsInfrastructureCustomizer`; this allows customization of the `StreamsBuilder` (e.g. to add a state store) and/or the `Topology` before the stream is created.

[source, java]
----
public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}
----

Default no-op implementations are provided to avoid having to implement both methods if one is not required.

A `CompositeKafkaStreamsInfrastructureCustomizer` is provided, for when you need to apply multiple customizers.

[[streams-micrometer]]
== KafkaStreams Micrometer Support

Introduced in version 2.5.3, you can configure a `KafkaStreamsMicrometerListener` to automatically register micrometer meters for the `KafkaStreams` object managed by the factory bean:

[source, java]
----
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
----

[[serde]]
== Streams JSON Serialization and Deserialization

For serializing and deserializing data when reading or writing to topics or state stores in JSON format, Spring for Apache Kafka provides a `JsonSerde` implementation that uses JSON, delegating to the `JsonSerializer` and `JsonDeserializer` described in xref:kafka/serdes.adoc[Serialization, Deserialization, and Message Conversion].
The `JsonSerde` implementation provides the same configuration options through its constructor (target type or `ObjectMapper`).
In the following example, we use the `JsonSerde` to serialize and deserialize the `Cat` payload of a Kafka stream (the `JsonSerde` can be used in a similar fashion wherever an instance is required):

[source,java]
----
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
----

When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.

[source, java]
----
stream.through(
    new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");
----

[[using-kafkastreambrancher]]
== Using `KafkaStreamBrancher`

The `KafkaStreamBrancher` class introduces a more convenient way to build conditional branches on top of `KStream`.

Consider the following example that does not use `KafkaStreamBrancher`:

[source,java]
----
KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
----

The following example uses `KafkaStreamBrancher`:

[source,java]
----
new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining
----

[[streams-config]]
== Configuration

To configure the Kafka Streams environment, the `StreamsBuilderFactoryBean` requires a `KafkaStreamsConfiguration` instance.
See the Apache Kafka https://kafka.apache.org/0102/documentation/#streamsconfigs[documentation] for all possible options.

IMPORTANT: Starting with version 2.2, the stream configuration is now provided as a `KafkaStreamsConfiguration` object, rather than as a `StreamsConfig`.

To avoid boilerplate code for most cases, especially when you develop microservices, Spring for Apache Kafka provides the `@EnableKafkaStreams` annotation, which you should place on a `@Configuration` class.
All you need is to declare a `KafkaStreamsConfiguration` bean named `defaultKafkaStreamsConfig`.
A `StreamsBuilderFactoryBean` bean, named `defaultKafkaStreamsBuilder`, is automatically declared in the application context.
You can declare and use any additional `StreamsBuilderFactoryBean` beans as well.
You can perform additional customization of that bean, by providing a bean that implements `StreamsBuilderFactoryBeanConfigurer`.
If there are multiple such beans, they will be applied according to their `Ordered.order` property.


=== Cleanup & Stop configuration

When the factory is stopped, the `KafkaStreams.close()` is called with 2 parameters :

* closeTimeout : how long to to wait for the threads to shutdown (defaults to `DEFAULT_CLOSE_TIMEOUT` set to 10 seconds). Can be configured using `StreamsBuilderFactoryBean.setCloseTimeout()`.
* leaveGroupOnClose : to trigger consumer leave call from the group (defaults to `false`). Can be configured using `StreamsBuilderFactoryBean.setLeaveGroupOnClose()`.

By default, when the factory bean is stopped, the `KafkaStreams.cleanUp()` method is called.
Starting with version 2.1.2, the factory bean has additional constructors, taking a `CleanupConfig` object that has properties to let you control whether the `cleanUp()` method is called during `start()` or `stop()` or neither.
Starting with version 2.7, the default is to never clean up local state.

[[streams-header-enricher]]
== Header Enricher

Version 3.0 added the `HeaderEnricherProcessor` extension of `ContextualProcessor`; providing the same functionality as the deprecated `HeaderEnricher` which implemented the deprecated `Transformer` interface.
This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties:

* `record` - the `org.apache.kafka.streams.processor.api.Record` (`key`, `value`, `timestamp`, `headers`)
* `key` - the key of the current record
* `value` - the value of the current record
* `context` - the `ProcessorContext`, allowing access to the current record metadata

The expressions must return a `byte[]` or a `String` (which will be converted to `byte[]` using `UTF-8`).

To use the enricher within a stream:

[source, java]
----
.process(() -> new HeaderEnricherProcessor(expressions))
----

The processor does not change the `key` or `value`; it simply adds headers.

IMPORTANT: You need a new instance for each record.

[source, java]
----
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
----

Here is a simple example, adding one literal header and one variable:

[source, java]
----
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);
----

[[streams-messaging]]
== `MessagingProcessor`

Version 3.0 added the `MessagingProcessor` extension of `ContextualProcessor`, providing the same functionality as the deprecated `MessagingTransformer` which implemented the deprecated `Transformer` interface.
This allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow.
The transformer requires an implementation of `MessagingFunction`.

[source, java]
----
@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}
----

Spring Integration automatically provides an implementation using its `GatewayProxyFactoryBean`.
It also requires a `MessagingMessageConverter` to convert the key, value and metadata (including headers) to/from a Spring Messaging `Message<?>`.
See https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#streams-integration[[Calling a Spring Integration Flow from a `KStream`]] for more information.

[[streams-deser-recovery]]
== Recovery from Deserialization Exceptions

Version 2.3 introduced the `RecoveringDeserializationExceptionHandler` which can take some action when a deserialization exception occurs.
Refer to the Kafka documentation about `DeserializationExceptionHandler`, of which the `RecoveringDeserializationExceptionHandler` is an implementation.
The `RecoveringDeserializationExceptionHandler` is configured with a `ConsumerRecordRecoverer` implementation.
The framework provides the `DeadLetterPublishingRecoverer` which sends the failed record to a dead-letter topic.
See xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records] for more information about this recoverer.

To configure the recoverer, add the following properties to your streams configuration:

[source, java]
----
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}
----

Of course, the `recoverer()` bean can be your own implementation of `ConsumerRecordRecoverer`.

[[kafka-streams-iq-support]]
== Interactive Query Support

Starting with version 3.2, Spring for Apache Kafka provides basic facilities required for interactive queries in Kafka Streams.
Interactive queries are useful in stateful Kafka Streams applications since they provide a way to constantly query the stateful stores in the application.
Thus, if an application wants to materialize the current view of the system under consideration, interactive queries provide a way to do that.
To learn more about interacive queries, see this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html[article].
The support in Spring for Apache Kafka is centered around an API called `KafkaStreamsInteractiveQueryService` which is a facade around interactive queries APIs in Kafka Streams library.
An application can create an instance of this service as a bean and then later on use it to retrieve the state store by its name.

The following code snippet shows an example.

[source, java]
----
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}
----

Assuming that a Kafka Streams application has a state store called `app-store`, then that store can be retrieved via the `KafkStreamsInteractiveQuery` API as show below.

[source, java]
----
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
----

Once an application gains access to the state store, then it can query from it for key-value information.

In this case, the state store that the application uses is a read-only key value store.
There are other types of state stores that a Kafka Streams application can use.
For instance, if an application prefers to query a window based store, it can build that store in the Kafka Streams application business logic and then later on retrieve it.
Because of this reason, the API to retrieve the queryable store in `KafkaStreamsInteractiveQueryService` has a generic store type signature, so that the end-user can assign the proper type.

Here is the type signature from the API.

[source, java]
----
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
----

When calling this method, the user can specifially ask for the proper state store type, as we have done in the above example.

=== Retrying State Store Retrieval

When trying to retrieve the state store using the `KafkaStreamsInteractiveQueryService`, there is a chance that the state store might not be found for various reasons.
If those reasons are transitory, `KafkaStreamsInteractiveQueryService` provides an option to retry the retrieval of the state store by allowing to inject a custom `RetryTemplate`.
By default, the `RetryTemmplate` that is used in `KafkaStreamsInteractiveQueryService` uses a maximum attempts of three with a fixed backoff of one second.

Here is how you can inject a custom `RetryTemmplate` into `KafkaStreamsInteractiveQueryService` with the maximum attempts of ten.

[source, java]
----
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}
----

=== Querying Remote State Stores

The API shown above for retrieving the state store - `retrieveQueryableStore` is intended for locally available key-value state stores.
In productions settings, Kafka Streams applications are most likely distributed based on the number of partitions.
If a topic has four partitions and there are four instances of the same Kafka Streams processor running, then each instance maybe responsible for processing a single partition from the topic.
In this scenario, calling `retrieveQueryableStore` may not give the correct result that an instance is looking for, although it might return a valid store.
Let's assume that the topic with four partitions has data about various keys and a single partition is always responsible for a specific key.
If the instance that is calling `retrieveQueryableStore` is looking for information about a key that this instance does not host, then it will not receive any data.
This is because the current Kafka Streams instance does not know anything about this key.
To fix this, the calling instance first needs to make sure that they have the host information for the Kafka Streams processor instance where the particular key is hosted.
This can be retrieved from any Kafka Streams instance under the same `application.id` as below.

[source, java]
----
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
----

In the example code above, the calling instance is querying for a particular key `12345` from the state-store named `app-store`.
The API also needs a corresponding key serializer, which in this case is the `IntegerSerializer`.
Kafka Streams looks through all it's instances under the same `application.id` and tries to find which instance hosts this particular key,
Once found, it returns that host information as a `HostInfo` object.

This is how the API looks like:

[source, java]
----
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
----

When using multiple instances of the Kafka Streams processors of the same `application.id` in a distributed way like this, the application is supposed to provide an RPC layer where the state stores can be queried over an RPC endpoint such as a REST one.
See this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html#querying-remote-state-stores-for-the-entire-app[article] for more details on this.
When using Spring for Apache Kafka, it is very easy to add a Spring based REST endpoint by using the spring-web technologies.
Once there is a REST endpoint, then that can be used to query the state stores from any Kafka Streams instance, given the `HostInfo` where the key is hosted is known to the instance.

If the key hosting the instance is the current instance, then the application does not need to call the RPC mechanism, but rather make an in-JVM call.
However, the trouble is that an application may not know that the instance that is making the call is where the key is hosted because a particular server may lose a partition due to a consumer rebalance.
To fix this issue, `KafkaStreamsInteractiveQueryService` provides a convenient API for querying the current host information via an API method `getCurrentKafkaStreamsApplicationHostInfo()` that returns the current `HostInfo`.
The idea is that the application can first acquire information about where the key is held, and then compare the `HostInfo` with the one about the current instance.
If the `HostInfo` data matches, then it can proceed with a simple JVM call via the `retrieveQueryableStore`, otherwise go with the RPC option.

[[kafka-streams-example]]
== Kafka Streams Example

The following example combines the various topics we have covered in this chapter:

[source, java]
----
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}
----
